{-#LANGUAGE RecordWildCards #-}
{-#LANGUAGE ScopedTypeVariables#-}
{-#LANGUAGE NamedFieldPuns#-}
{-#LANGUAGE OverloadedStrings#-}
{-#LANGUAGE MultiWayIf#-}
module Libp2p.Mux.Internal where
import Prelude hiding (log)
import Libp2p.Mux.Types
import Libp2p.Mux.Const
import Libp2p.Mux.Config
import qualified Control.Concurrent.STM.TBQueue as TB
import qualified Network.Socket as NS
import Control.Concurrent
import qualified Control.Concurrent.STM.TVar as TVar
import qualified Control.Concurrent.STM.TMVar as TMVar
import qualified Control.Concurrent.MVar as MVar
import qualified Data.Map as Map
import qualified Control.Concurrent.STM.TChan as TC
import qualified Data.ByteString as BS
import Data.Word
import qualified Control.Monad.STM as STM
import qualified Control.Concurrent.Actor as Act
import Control.Monad (when)
import qualified System.Log.FastLogger as Log
import qualified Network.Socket.ByteString  as NBS
import Control.Monad.Trans.Except
import Control.Monad.Trans.Class
import Control.Concurrent.Async
import Data.Serialize
import Data.Bits
import Control.Error.Safe
import Data.Map
import Data.Maybe
import Data.IORef
import Data.Time.Clock
import qualified Data.List as L

mkSession::NS.Socket -> Config -> Bool -> IO Session
mkSession conn config@Config{..} isClient = do
  (log, _ ) <- Log.newFastLogger logOutput
  shutdownCh <- TMVar.newEmptyTMVarIO
  pingID <- newIORef 0
  pings  <- newIORef Map.empty
  sendChannel <- TC.newTChanIO
  remoteGoAway <- newIORef False
  localGoAway <- newIORef False
  nextStreamID <- newIORef (if isClient then 1 else 2)
  sendDownCh <- TMVar.newEmptyTMVarIO
  forkIO $ sendLoop conn shutdownCh sendChannel sendDownCh
  inflight <- newIORef Map.empty
  streams　<- newIORef Map.empty
  sess <- STM.atomically $ do
   recvActor <- TVar.newTVar Nothing
   mainActor <- TVar.newTVar Nothing
   isShutdown <- TVar.newTVar False
   shutdownErr <- TVar.newTVar ""
   recvDownCh <- TMVar.newEmptyTMVar
   
   synCh <- TB.newTBQueue acceptBacklog
   acceptCh <- TB.newTBQueue acceptBacklog
   pure Session{..}
  mayAlive <- if enableKeepAlive 
                then Just <$> (Act.actor sess keepAliveLoop)
                else pure Nothing
  mainAct <- Act.actor sess mainLoop
  recvAct <- Act.actor (sess,mainAct,mayAlive) recvLoop
  STM.atomically $ do
     TVar.writeTVar (mainActor sess) (Just mainAct)
     TVar.writeTVar (recvActor sess) (Just recvAct)
  return sess

sendMessage::TMVar.TMVar () -> TC.TChan BS.ByteString -> Header -> Maybe BS.ByteString -> IO (Maybe String)
sendMessage shutdown ch hdr mayBytes = do
  isRuning <- STM.atomically $ TMVar.isEmptyTMVar shutdown
  if isRuning 
    then do
      let sendBytes = case mayBytes of
                   Nothing    -> encode hdr
                   Just bytes -> encode hdr <> bytes
      putStrLn $ "[INFO] SendMessage: " <> show hdr
      STM.atomically $ TC.writeTChan ch sendBytes
      pure Nothing
    else pure (Just "session is shutdown, can't send message")

isClosed::Session -> IO Bool
isClosed sess = STM.atomically $ TMVar.isEmptyTMVar (shutdownCh sess)

mainLoop::Session -> Act.MBox SessionMainMessage  -> IO ()
mainLoop sess@Session{..} mbox = do
  eRet <- race (STM.atomically $ TMVar.readTMVar shutdownCh) 
               (STM.atomically $ TC.readTChan mbox)
  case eRet of
    Left    _ -> log "[INFO] CloseMainLoop\r\n"
    Right msg -> case msg of
      OpenStream var -> do
        isClose <- isClosed sess
        eRet <- runExceptT $ do
                 tryAssert "Session is stoped" isClose
                 rmGoAway <- lift $ readIORef remoteGoAway
                 tryAssert "ErrRemoteGoAway" (not rmGoAway)
        case eRet of
          Left err -> MVar.putMVar var (Left err)
          Right _  -> do
           synRet <- STM.atomically $ ((TB.writeTBQueue synCh ()) >> pure (Right ())) `STM.orElse`
                                      ((TMVar.readTMVar shutdownCh) >> pure (Left ()))
           case synRet of
            Left  _ ->  MVar.putMVar var (Left "session shutdown")
            Right _ -> do
                       sid <- readIORef nextStreamID
                       modifyIORef nextStreamID (+2)
                       stream <- mkStream sess sid StreamInit
                       modifyIORef streams (Map.insert sid stream)
                       modifyIORef inflight (Map.insert sid ())
                       mayError <- sendWindowUpdate stream
                       case mayError of
                        Nothing   -> MVar.putMVar var (Right stream)
                        Just err  -> do
                                      STM.atomically $ TB.tryPeekTBQueue synCh
                                      MVar.putMVar var (Left err)
                       
        mainLoop sess mbox
      HandlerWindowUpdate hdr -> do
        putStrLn $ "HandlerWindowUpdate!!!!!!!!!!!!!!!!!!!!!!" <> show hdr
        handleStreamMessage sess hdr BS.empty
        mainLoop sess mbox
      HandlerReadData hdr buf -> do
        handleStreamMessage sess hdr buf
        mainLoop sess mbox
      GoAway typ -> do
        modifyIORef localGoAway (const True)
        let goAwayHdr = mkHeader TypeGoAway 0 0 (fromIntegral typ)
        sendMessage shutdownCh sendChannel goAwayHdr Nothing
        mainLoop sess mbox
      HandlerGoAway typ -> do
        putStrLn "[INFO] HandlerGoAway"
        if 
          | typ == goAwayNormal -> modifyIORef remoteGoAway (const True)
          | typ == goAwayProtoErr -> log "[ERROR] mux: received protocol error go away"
          | typ == goAwayInternalErr -> log "[ERROR] mux: received internal error go away"
          | otherwise  -> log "[ERROR] received unexpected go away"
        mainLoop sess mbox
      SendStreamEstablished sid -> do
        modifyIORef inflight (Map.delete (fromIntegral sid))
        STM.atomically $ TB.tryPeekTBQueue synCh
        putStrLn "[INFO] MainLoop SendStreamEstablished"
        mainLoop sess mbox
      CloseStream sid -> do
        log  $ Log.toLogStr $ "[INFO] Close Stream" <> show sid
        inMap <- readIORef inflight
        let mayS = Map.lookup (fromIntegral sid) inMap
        when (isJust mayS) (STM.atomically $ TB.tryPeekTBQueue synCh >> pure ())
        modifyIORef streams (Map.delete $ fromIntegral sid)
        mainLoop sess mbox
      CloseSession -> do
       ret <- runExceptT $ innerCloseSession sess
       case ret of
        Left l  -> log $ Log.toLogStr l
        Right _ -> pure ()
      GetStreamLength lenVar -> do
       streamMap <- readIORef streams
       STM.atomically $ TMVar.putTMVar lenVar (length streamMap)

closeSession::Session -> IO ()
closeSession sess = pushSessionActor sess CloseSession

innerCloseSession::Session -> ExceptT String IO ()
innerCloseSession sess@Session{..} = do
  shutdown <- lift $ TVar.readTVarIO isShutdown
  lift $ when (not shutdown) $ do
          STM.atomically $ TVar.writeTVar isShutdown True
          shutErr <- TVar.readTVarIO shutdownErr
          when (shutErr == "") (STM.atomically $ TVar.writeTVar shutdownErr "ErrSessionShutdown")
          STM.atomically $ TMVar.tryPutTMVar shutdownCh ()
          NS.shutdown conn NS.ShutdownBoth
          STM.atomically $ TMVar.readTMVar sendDownCh
          STM.atomically $ TMVar.readTMVar recvDownCh
          streamMap <- readIORef streams
          mapM_ forceCloseStream streamMap
          putStrLn "[INFO] CloseSession Success"
  pure ()

handleStreamMessage::Session -> Header ->  BS.ByteString -> IO ()
handleStreamMessage sess@Session{..} hdr@Header{..} buf = do
 let isSYN = (headerFlags .&. flagSYN) == flagSYN
 when isSYN (do
              err <- incomingStream sess hdr
              either (\str -> log $ Log.toLogStr $ "[ERROR]" <> show str) (const $ pure ()) err
            )
 streamMap <- readIORef streams
 let mayStream = Map.lookup (fromIntegral headerStreamId) streamMap
 case mayStream of
  Nothing     -> log "[ERROR] handleStreamMessage mayStream is Nothing\r\n"
  Just stream -> if headerMsgType == TypeWindowUpdate
                     then do
                            mayError <- incrSendWindow stream hdr
                            case mayError of
                              Nothing     -> pure ()
                              Just errStr -> do
                                let goAwayHdr = mkHeader TypeGoAway 0 0 (fromIntegral goAwayProtoErr)
                                maySendErr <- sendMessage shutdownCh sendChannel goAwayHdr Nothing
                                maybe (pure ()) (const $ log "[WARN] failed to send go away") maySendErr
                            log "[INFO] handleStreamMessage incrSendWindow\r\n"
                     else do
                      pushStreamActor stream $ ReadToBuffer hdr buf
                      
 return ()

incomingStream::Session -> Header -> IO (Either String ())
incomingStream sess@Session{..} Header{..} = do
  runExceptT $ do
    tryAssert "both yamux endpoints are clients" (isClient == (even headerStreamId))
    isLocalGoAway <- lift $ readIORef localGoAway
    err <- if isLocalGoAway then do
                                let hdr = mkHeader TypeWindowUpdate flagRST (fromIntegral headerStreamId) 0
                                mayError::Maybe String <- lift $ sendMessage shutdownCh sendChannel hdr Nothing
                                let e::Either String () = case mayError of
                                                            Nothing -> Right ()
                                                            Just err -> Left err
                                (ExceptT $ return e)
                            else (ExceptT $ return $ Right ())
    newStream <- lift $ mkStream sess (fromIntegral headerStreamId) StreamSYNReceived
    streamMap <- lift $ readIORef streams
    let isDuplicate = Map.member (fromIntegral headerStreamId) streamMap
    err <- if isDuplicate 
           then do
                let goAwayHdr = mkHeader TypeGoAway 0 0 (fromIntegral goAwayProtoErr)
                lift $ modifyIORef localGoAway (const True)
                lift $ sendMessage shutdownCh sendChannel goAwayHdr Nothing
                (ExceptT $ return $ Left "duplicate stream declared")
           else (ExceptT $ return $ Right ())
    lift $ modifyIORef streams (Map.insert (fromIntegral headerStreamId) newStream)
    isFull <- lift $ STM.atomically $ TB.isFullTBQueue acceptCh
    if isFull 
      then do
        lift $ log "[WARN] backlog exceeded, forcing connection reset"
        lift $ modifyIORef streams (Map.delete (fromIntegral headerStreamId))
        let hdr = mkHeader TypeWindowUpdate flagRST (fromIntegral headerStreamId) 0
        mayError::Maybe String <- lift $ sendMessage shutdownCh sendChannel hdr Nothing
        let e::Either String () = maybe (Right ()) Left mayError
        (ExceptT $ return e)
      else lift $ STM.atomically $ TB.writeTBQueue acceptCh newStream

data KeepAliveLoopMsg = SendPing | CloseKeepAlive | HandlerPing Int Bool

keepAliveLoop::Session -> Act.MBox (Int,Bool) -> IO ()
keepAliveLoop sess@Session{..} mbox = do
  tickVar::TMVar.TMVar KeepAliveLoopMsg <- TMVar.newEmptyTMVarIO
  forkIO (do 
          threadDelay ((keepAliveInterval config) * 1000)
          STM.atomically $ TMVar.putTMVar tickVar SendPing
         )
  msg <- STM.atomically $
          (TMVar.readTMVar tickVar) `STM.orElse`
          (TMVar.readTMVar shutdownCh >> pure CloseKeepAlive) `STM.orElse`
          (uncurry ((return .) . HandlerPing) =<< TC.readTChan mbox)
  case msg of
    SendPing       -> do
      pid <- readIORef pingID
      let newPid = pid + 1
      writeIORef pingID newPid
      nowTime <- getCurrentTime
      modifyIORef pings (Map.insert newPid nowTime . Map.filter (> addUTCTime (fromIntegral (connectionWriteTimeout config)) nowTime))
      let pingHeader = mkHeader TypePing flagSYN 0 (fromIntegral newPid)
      sendMessage shutdownCh sendChannel pingHeader Nothing
      keepAliveLoop sess mbox
    CloseKeepAlive        -> log "[INFO] CloseKeepAlive\r\n"
    HandlerPing hPid isSyn -> do
      if isSyn 
        then do
          sendMessage shutdownCh sendChannel (mkHeader TypePing flagACK 0 (fromIntegral hPid)) Nothing
          pure ()
        else modifyIORef pings (\pingMap -> Map.delete hPid pingMap)
      keepAliveLoop sess mbox


recvLoop::(Session,Act.Actor SessionMainMessage,Maybe (Act.Actor (Int,Bool))) -> Act.MBox String -> IO ()
recvLoop (sess@Session{..},mainAct,mayAliveActor) mbox = do
  hdrBytes <- NBS.recv conn headerSize
  if BS.length hdrBytes == 0 
    then STM.atomically $ TMVar.tryPutTMVar recvDownCh () >> pure ()
    else 
       case (decode hdrBytes) of
         Left str  ->  do
                   putStrLn $ "[ERROR] Recv Error: " <> show str
                   STM.atomically $ TMVar.tryPutTMVar recvDownCh ()
                   pure ()
         Right hdr -> handler hdr
 where
  handler::Header -> IO ()
  handler hdr@Header{..} = do
    if (headerVersion /= protoVersion)
      then putStrLn $ "[ERROR] mux: Invalid protocol version: " <> (show $ headerVersion)
      else do
            case headerMsgType of
                  TypePing   -> do
                    let isSYN = (headerFlags .&. flagSYN) == flagSYN
                    maybe (pure ()) 
                          (\aliveAct -> Act.send aliveAct (fromIntegral headerLength,isSYN) >> pure ()) 
                          mayAliveActor
                  TypeData   -> do
                                 buf <- NBS.recv conn (fromIntegral headerLength)
                                 Act.send mainAct $ HandlerReadData hdr buf
                                 pure ()
                  TypeGoAway -> do 
                                 Act.send mainAct $ HandlerGoAway (fromIntegral headerLength)
                                 pure ()
                  TypeWindowUpdate -> do
                                       Act.send mainAct $ HandlerWindowUpdate hdr
                                       pure ()
            recvLoop (sess,mainAct,mayAliveActor) mbox


sendLoop::NS.Socket ->  TMVar.TMVar () -> TC.TChan BS.ByteString -> TMVar.TMVar () -> IO ()
sendLoop conn shutdownCh sendChannel sendDownCh = do
  eRet <- race 
           (STM.atomically $ TMVar.readTMVar shutdownCh)
           (STM.atomically $ TC.readTChan sendChannel)
  case eRet of
    Left _   -> STM.atomically $ TMVar.tryPutTMVar sendDownCh () >> pure ()
    Right bs -> do
      NBS.send conn bs >> sendLoop conn shutdownCh sendChannel sendDownCh


openStream::Session -> ExceptT String IO Stream
openStream sess@Session{..} = do
  mayMainActor <- lift $ TVar.readTVarIO mainActor
  case mayMainActor of
    Nothing  -> throwE "[ERROR] MainActor is stoped"
    Just act -> do
      waitVar <- lift $ MVar.newEmptyMVar
      lift $ Act.send act (OpenStream waitVar)
      e <- lift $ MVar.readMVar waitVar
      ExceptT (return e)

acceptStream::Session -> ExceptT String IO Stream
acceptStream sess@Session{..}  = do
  ret <- lift $ STM.atomically $ (TB.readTBQueue acceptCh >>= (\s -> pure $ Right s)) `STM.orElse`
                                 ((TMVar.readTMVar shutdownCh) >> (pure $ Left ()))
  case ret of
    Left _  -> throwE "shutdown"
    Right s -> return s

goAway::Session -> IO ()
goAway Session{..} = do
  mayMainActor <- TVar.readTVarIO mainActor
  maybe (pure ()) (\act -> (Act.send act (GoAway goAwayNormal)) >> pure ()) mayMainActor

closeStream::Session -> Int -> IO ()
closeStream sess sid = pushSessionActor sess $ CloseStream sid

numStreams::Session -> IO Int
numStreams sess = do
  lenVar::TMVar.TMVar Int <- STM.atomically $ TMVar.newEmptyTMVar
  pushSessionActor sess $ GetStreamLength lenVar
  STM.atomically $ TMVar.readTMVar lenVar

exitErrSession::Session -> String -> IO ()
exitErrSession sess@Session{..} msg = do
  STM.atomically $ do
   errStr <- TVar.readTVar shutdownErr
   if errStr == "" then TVar.writeTVar shutdownErr errStr else pure ()
  closeSession sess
---------------

mkStream::Session -> Integer -> StreamState -> IO Stream
mkStream sess sid state = do
    let streamId = sid
    stream <- STM.atomically $ do
      recvWindow <- TVar.newTVar initialStreamWindow
      sendWindow <- TVar.newTVar initialStreamWindow
      streamState <- TVar.newTVar state
      recvBuf <- TVar.newTVar BS.empty
      streamActor <- TVar.newTVar Nothing
      sendNotifyCh <- TMVar.newEmptyTMVar
      recvNotifyCh <- TMVar.newEmptyTMVar
      pure Stream{..}
    sendAct <- Act.actor stream streamLoop
    STM.atomically $ TVar.writeTVar (streamActor stream) (Just sendAct)
    pure stream

sendWindowUpdate::Stream -> IO (Maybe String)
sendWindowUpdate s@Stream{..} = do
  var <- MVar.newEmptyMVar
  pushStreamActor s (SendWindowUpdate var)
  MVar.readMVar var

incrSendWindow::Stream -> Header -> IO (Maybe String)
incrSendWindow s@Stream{..} hdr = do
  var <- MVar.newEmptyMVar
  pushStreamActor s (IncrSendWindow hdr var)
  MVar.readMVar var

streamLoop::Stream -> Act.MBox StreamMessage -> IO ()
streamLoop stream@Stream{..} mbox = do
  msg <- STM.atomically $ TC.readTChan mbox
  case msg of
    SendWindowUpdate var -> do
      putStrLn "[INFO] streamLoop SendWindowUpdate"
      mayError <- innerSendWindowUpdate stream
      MVar.putMVar var mayError
      streamLoop stream mbox
    IncrSendWindow hdr@Header{..} var -> do
      putStrLn "[INFO] innerIncrSendWindow"
      mayError::Maybe String <- processFlags stream (fromIntegral headerFlags)
      case mayError of
        Nothing -> do
          innerIncrSendWindow sendWindow (fromIntegral headerLength) headerFlags sendNotifyCh
          MVar.putMVar var Nothing
        Just errStr -> MVar.putMVar var $ Just errStr
      streamLoop stream mbox
    ReadToBuffer hdr@Header{..} buf -> do
      putStrLn "[INFO] Enter streamLoop ReadToBuffer"
      mayError::Maybe String <- processFlags stream (fromIntegral headerFlags)
      case mayError of
        Just errStr -> (log sess) (Log.toLogStr $ "[ERROR] " <> errStr)
        Nothing -> do
          winSize <- TVar.readTVarIO recvWindow
          if (fromIntegral headerLength) > winSize 
            then (log sess) "[ERROR] receive window exceeded"
            else do
              STM.atomically $ do
                TVar.modifyTVar recvBuf (\a -> a <> buf)
                TVar.modifyTVar recvWindow (\a -> a - (fromIntegral headerLength))
                TMVar.tryPutTMVar recvNotifyCh ()
                pure ()
      streamLoop stream mbox

innerSendWindowUpdate::Stream -> IO (Maybe String)
innerSendWindowUpdate s@Stream{..} = do
  let max = maxStreamWindowSize $ config sess
  buf     <- TVar.readTVarIO recvBuf
  recvWin <- TVar.readTVarIO recvWindow
  let delta = (max - BS.length buf) - recvWin
  flags <- sendFlags s
  let b = (delta < (max `div` 2) && flags == 0)
  if (not b)
   then do
    STM.atomically $ TVar.writeTVar recvWindow (recvWin + delta)
    let hdr = mkHeader TypeWindowUpdate flags (fromIntegral streamId) (fromIntegral delta)
    sendMessage (shutdownCh sess) (sendChannel sess) hdr Nothing
   else pure Nothing

innerIncrSendWindow::TVar.TVar Int -> Int -> Word16 -> TMVar.TMVar () -> IO ()
innerIncrSendWindow tSendWindow hdrLength flags sendCh = do
  STM.atomically $ do
    sendWindow <- TVar.readTVar tSendWindow
    TVar.writeTVar tSendWindow (sendWindow + hdrLength)
    TMVar.tryPutTMVar sendCh ()
    pure ()


sendFlags::Stream -> IO Word16
sendFlags stream@Stream{..} = do
  s <- TVar.readTVarIO streamState
  case s of
    StreamInit  -> do
     STM.atomically $ TVar.writeTVar streamState StreamSYNSent
     pure flagSYN
    StreamSYNReceived -> do
     STM.atomically $ TVar.writeTVar streamState StreamEstablished
     pure flagACK
    otherwise -> pure 0

processFlags::Stream -> Word16 -> IO (Maybe String)
processFlags stream@Stream{..} flags = do
  let isACK = (fromIntegral flags) .&. flagACK == flagACK
  let isFIN = (fromIntegral flags) .&. flagFIN == flagFIN
  let isRST = (fromIntegral flags) .&. flagRST == flagRST
  curState <- TVar.readTVarIO streamState
  when isACK $ do
                STM.atomically $ do
                  when (curState == StreamSYNSent) $ TVar.modifyTVar streamState (const StreamEstablished)
                pushSessionActor sess (SendStreamEstablished  $ fromIntegral streamId)
  when isRST (do
                STM.atomically $ TVar.modifyTVar (streamState) (const StreamReset)
                closeStream sess $ fromIntegral streamId
             )
  if
    | isFIN && L.elem curState [StreamSYNSent,StreamSYNReceived,StreamEstablished] -> do
                                                                              STM.atomically $ TVar.modifyTVar streamState (const StreamRemoteClose)
                                                                              pure Nothing
    | isFIN && curState == StreamLocalClose -> do
                                       STM.atomically $ TVar.modifyTVar streamState (const StreamClosed)
                                       pure Nothing
    | isFIN -> pure $ Just "ErrUnexpectedFlag"                                             
    | otherwise -> pure Nothing


streamWrite::Stream -> BS.ByteString -> IO ()
streamWrite s bytes = do
  e <- runExceptT $ innerWrite bytes s
  case e of
    Left err -> putStrLn $ "[ERROR] " <> err
    Right n  -> pure ()

innerWrite::BS.ByteString -> Stream -> ExceptT String IO Int
innerWrite bytes stream@Stream{..} = do
  writeLoop bytes stream 0
 where
  writeLoop::BS.ByteString -> Stream -> Int -> ExceptT String IO Int
  writeLoop bytes stream n = do
    if n < BS.length bytes 
      then do
        writeN <- innerWriteSlice (BS.drop n bytes) stream
        writeLoop bytes stream (n + writeN)
      else pure n

innerWriteSlice::BS.ByteString -> Stream -> ExceptT String IO Int
innerWriteSlice bytes stream@Stream{..} = do
  state <- lift $ TVar.readTVarIO streamState
  tryAssert "ErrStreamClosed" (state /= StreamClosed && state /= StreamLocalClose)
  tryAssert "ErrConnectionReset" (state /= StreamReset)
  flags <-  lift $ sendFlags stream
  sendWin <- lift $ STM.atomically $ TVar.readTVar sendWindow
  if sendWin == 0 
    then do
          ret <- lift $ race (threadDelay 3000000) (STM.atomically $ TMVar.readTMVar sendNotifyCh)
          case ret of
            Left _  -> throwE "ErrTimeout"
            Right _ -> innerWriteSlice bytes stream
    else do
     let maxSend = minimum [sendWin,BS.length bytes,(maxMessageSize $ config sess) - headerSize]
     let hdr = mkHeader TypeData flags (fromIntegral streamId) (fromIntegral  maxSend)
     lift $ sendMessage (shutdownCh sess) (sendChannel sess) hdr (Just $ BS.take maxSend bytes)
     lift $ STM.atomically $ TVar.writeTVar sendWindow (sendWin - maxSend)
     lift $ putStrLn $ "[INFO] New SendWindow Size ==== " <> (show $ sendWin - maxSend)
     pure maxSend

streamRead::Stream -> Int -> ExceptT String IO BS.ByteString
streamRead s@Stream{..} readSize = do
  buf <- innerRead
  lift $ STM.atomically $ TMVar.tryPutTMVar recvNotifyCh ()
  pure buf
 where
  innerRead:: ExceptT String IO BS.ByteString
  innerRead = do
    state <- lift $ TVar.readTVarIO streamState
    tryAssert "ErrConnectionReset" (state /= StreamReset)
    buf <- lift $ TVar.readTVarIO recvBuf
    let bufLen = BS.length buf
    if 
      | (state == StreamClosed || state == StreamRemoteClose) && (BS.length buf) == 0 -> throwE "EOF"
      | bufLen == 0 -> do
           ret <- lift $ race (threadDelay 3000000) (STM.atomically $ TMVar.readTMVar recvNotifyCh)
           case ret of
            Left _   -> throwE "read timeout"
            Right _  -> innerRead
      | readSize > bufLen -> throwE "readSize > bufLen"
      | otherwise -> do
          let (r,m) = BS.splitAt readSize buf
          lift $ STM.atomically $ TVar.modifyTVar recvBuf (const m)
          lift $ sendWindowUpdate s
          pure r

forceCloseStream::Stream -> IO ()
forceCloseStream Stream{..} = do
  state <- TVar.readTVarIO streamState
  when (state /= StreamClosed) $ (STM.atomically $ TVar.modifyTVar streamState (const StreamReset))
  STM.atomically $ do
    TMVar.tryPutTMVar recvNotifyCh ()
    TMVar.tryPutTMVar sendNotifyCh ()
  pure ()

pushStreamActor::Stream -> StreamMessage -> IO ()
pushStreamActor Stream{..} smsg = do
  mayAct::Maybe (Act.Actor StreamMessage) <- TVar.readTVarIO streamActor
  case mayAct of
    Just act -> act Act.! smsg >> pure ()
    Nothing -> pure ()

pushSessionActor::Session -> SessionMainMessage -> IO ()
pushSessionActor Session{..} msg = do
  mayAct <- TVar.readTVarIO mainActor
  maybe (pure ()) (\act -> Act.send act msg >> pure ()) mayAct